package com.nx.platform.es.system.utils;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.nx.arch.jodis.RoundRobinJedisPool;
import com.nx.arch.redis.clients.jedis.Jedis;
import com.nx.arch.redis.clients.jedis.Pipeline;
import com.nx.arch.redis.clients.jedis.Response;
import com.nx.arch.redis.clients.util.SafeEncoder;
import com.nx.platform.es.common.utils.Constants;
import com.nx.platform.es.common.utils.YamlParser;
import com.nx.platform.es.system.config.ConfigCenter;

import lombok.extern.slf4j.Slf4j;

/**
 * redis  = codis
 * Created by yanglian on 2018/5/16.
 */
@Slf4j
public class RedisUtil {

    static RoundRobinJedisPool redisPool;

    static {
        try {
            String zkAddr = loadCodisConfig().get(Constants.CODIS_ZK_ADDRESS);
            Integer zkSessionTimeoutMs = Integer.valueOf(loadCodisConfig().get(Constants.CODIS_ZK_TIMEOUT));
            Integer connectTimeoutMs = Integer.valueOf(loadCodisConfig().get(Constants.CODIS_ZK_CON_TIMEOUT));
            Integer soTimeoutMs =Integer.valueOf(loadCodisConfig().get(Constants.CODIS_ZK_SO_TIMEOUT));
            String zkProxyDir = loadCodisConfig().get(Constants.CODIS_ZK_DIR);
            String team = loadCodisConfig().get(Constants.CODIS_TEAM);
            String appKey = loadCodisConfig().get(Constants.CODIS_APPKEY);
            String password = loadCodisConfig().get(Constants.CODIS_PASSWORD);;
            redisPool = RoundRobinJedisPool.create().curatorClient(zkAddr, zkSessionTimeoutMs).zkProxyDir(zkProxyDir).
                    team(team).connectionTimeoutMs(connectTimeoutMs).soTimeoutMs(soTimeoutMs).appKey(appKey).
                    password(password).build();
        } catch (Exception e) {
            log.error(" desc=RedisInitErr e=", e.getMessage());
        }

    }

    private static Map<String, String> loadCodisConfig() {
        return (Map<String, String>) ConfigCenter.getConfig(Constants.CODIS_CONFIG_KEY, YamlParser::parseToMap)
                .orElse(new HashMap<>());
    }

    public static Jedis getRedis() {
        return redisPool.getResource();
    }

    public static List<String> redisMget(List<String> keys, String logPrefix) {
        Jedis jedis = getRedis();
        List<String> result = new ArrayList<String>();
        try {
            for (String key : keys) {
                String re = jedis.get(key);
                log.debug(logPrefix, " desc=jedisGet key=", key, " result=", re);
                result.add(jedis.get(re));
            }
        } catch (Exception e) {
            log.error(logPrefix, " desc=jedisErr e=", e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return result;
    }

    public static void addString(String key, String val, int expire, String logStr) {
        Jedis jedis = redisPool.getResource();
        try {
            jedis.setex(key, expire, val);
        } catch (Exception e) {
            log.error(logStr, e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }

    public static String getString(String key, String logStr) {
        Jedis jedis = redisPool.getResource();
        String value = null;
        try {
            value = jedis.get(key);
        } catch (Exception e) {
            log.error(logStr, e.getMessage());
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
        return value;
    }


    public static Long mdel(List<String> keyList, String logStr) {
        Jedis jedis = null;
        Long result = null;
        try {
            jedis = redisPool.getResource();
            result = jedis.del(keyList.toArray(new String[0]));
        } catch (Exception e) {
            log.error(logStr, e.getMessage());
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return result;
    }


    public static boolean exists(final String key, final String logStr) {
        boolean bool = false;
        //双写
        Jedis jedis = null;
        try {
            jedis = getRedis();
            bool = jedis.exists(key);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return bool;
    }

    public static long setNx(final String key, final String value, final String logStr) {

        Jedis jedis = null;
        try {
            jedis = getRedis();
            return jedis.setnx(key.getBytes(), object2Bytes(value));
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    public static void remove(final String key, final String logStr) {

        Jedis jedis = null;
        try {
            jedis = getRedis();
            jedis.del(key);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }


    public static long increment(final String key, final int seconds, final String logStr) {

        Jedis jedis = getRedis();
        try {
            if (seconds > 0) {
                jedis.expire(key, seconds);
            }
            return jedis.incr(key);
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    public long incrementRedisMapBy(final String key, final String field, final Long value, final int seconds, final String logStr) {

        Jedis jedis = null;
        long retNew = 0;
        try {
            jedis = getRedis();
            retNew = jedis.hincrBy(key, field, value);
            log.info(logStr, "desc=compare new=", retNew);
            if (seconds > 0) {
                jedis.expire(key.getBytes(), seconds);// 设置过期时间
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return retNew;
    }

    public static String getStringFromRedis(final String key, final String logStr) {

        Jedis jedis = getRedis();
        try {
            return jedis.get(key);
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    public static void putStringToRedis(final String key, final String value, final int seconds, final String logStr) {

        Jedis jedis = getRedis();
        try {
            jedis.set(key.getBytes(), value.getBytes());
            if (seconds > 0) {
                jedis.expire(key, seconds);// 设置过期时间
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    public static Object get(final String key, final String logStr) {
        Jedis jedis = getRedis();
        try {
            byte[] obj = jedis.get(key.getBytes());

            return bytes2Object(obj);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return null;
    }

    private static Object bytes2Object(byte[] objBytes) throws Exception {
        if (objBytes == null || objBytes.length == 0) {
            return null;
        }
        ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
        ObjectInputStream oi = new ObjectInputStream(bi);
        Object obj = oi.readObject();
        bi.close();
        oi.close();
        return obj;
    }


    public static void putToRedis(final String key, final Serializable value, final int seconds, final String logStr) {

        Jedis jedis = getRedis();
        try {
            jedis.set(key.getBytes(), object2Bytes(value));
            if (seconds > 0) {
                jedis.expire(key.getBytes(), seconds);// 设置过期时间
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    private static byte[] object2Bytes(Serializable obj) throws Exception {
        if (obj == null) {
            return null;
        }
        ByteArrayOutputStream bo = new ByteArrayOutputStream();
        ObjectOutputStream oo = new ObjectOutputStream(bo);
        oo.writeObject(obj);
        bo.close();
        oo.close();
        return bo.toByteArray();
    }

    public static long putToRedisList(final String logStr, final String key, final boolean isR, final int seconds,
                                      final Serializable... entry) {
        if (entry.length == 0) {
            return -1;
        }

        Jedis jedis = null;

        long ret = 0;
        try {
            jedis = getRedis();
            List<byte[]> valuesL = new ArrayList<byte[]>();
            for (Serializable entity : entry) {
                valuesL.add(object2Bytes(entity));
            }
            if (isR) {
                ret = jedis.rpush(key.getBytes(), valuesL.toArray(new byte[0][]));
            } else {
                ret = jedis.lpush(key.getBytes(), valuesL.toArray(new byte[0][]));
            }
            if (seconds > 0) {
                jedis.expire(key.getBytes(), seconds);// 设置过期时间
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }


    public static List<Object> getFromRedisList(final String key, final long start, final long end, final String logStr) {

        Jedis jedis = null;
        List<Object> ret = new ArrayList<Object>();
        try {
            jedis = getRedis();
            List<byte[]> tmp = jedis.lrange(key.getBytes(), start, end);
            for (int i = 0; i < tmp.size(); i++) {
                ret.add(bytes2Object(tmp.get(i)));
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }


    public static Set<String> getAllFromRedisSet(final String key, final String logStr) {

        Jedis jedis = null;
        Set<String> retSet;
        try {
            jedis = getRedis();
            retSet = jedis.smembers(key);
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return retSet;

    }

    public static long putStringToRedisSet(final String key, final String value, final int seconds, final String logStr) {

        Jedis jedis = null;
        try {
            jedis = getRedis();
            long count = jedis.sadd(key, value);
            if (seconds > 0) {
                jedis.expire(key, seconds);// 设置过期时间
            }
            return count;
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return 0L;
    }

    public static long deleteFromRedisSet(final String key, final String value, final String logStr) {

        Jedis jedis = null;
        try {
            jedis = getRedis();
            jedis.srem(key, value);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return 0;
    }

    public static long getRedisSetSize(final String key, final String logStr) {

        Jedis jedis = null;
        try {
            jedis = getRedis();
            return jedis.scard(key.getBytes()).longValue();
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    public static Long removeMemberFromZSet(final String key, final String logStr, final String... members) {
        Jedis jedis = null;
        //正常但是没有数据情况返回empty
        Long ret = null;
        try {
            jedis = getRedis();
            ret = jedis.zrem(key, members);
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }


    public static boolean isElementExistInRedisSet(final String key, final String value, final String logStr) {
        Jedis jedis = null;
        try {
            jedis = getRedis();
            return jedis.sismember(key.getBytes(), object2Bytes(value));
        } catch (Exception e) {
            log.error(logStr, e);
            throw new RuntimeException(e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }

    public static Map<String, Object> getHMAll(final String key, final String logStr) {

        Jedis jedis = null;
        Map<byte[], byte[]> ret = null;
        Map<String, Object> hash = new HashMap<String, Object>();
        try {
            jedis = getRedis();
            ret = jedis.hgetAll(key.getBytes());

            Iterator iter = ret.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry<byte[], byte[]> entry = (Map.Entry<byte[], byte[]>) iter.next();
                hash.put(SafeEncoder.encode(entry.getKey()), bytes2Object(entry.getValue()));
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }

        return hash;
    }

    public static long delStringFromRedisMap(final String key, final String field, final String logStr) {

        Jedis jedis = null;
        long ret = 0;
        try {
            jedis = getRedis();
            ret = jedis.hdel(key, field);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }

    public static Map<String, String> getMapFromRedisMap(final String key, final String logStr) {

        Jedis jedis = null;
        Map<String, String> ret = null;
        try {
            jedis = getRedis();
            ret = jedis.hgetAll(key);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }

    public static long putStringToRedisMap(final String key, final String field, final String value, final int seconds, final String logStr) {

        Jedis jedis = null;
        long ret = 0;
        try {
            jedis = getRedis();
            ret = jedis.hset(key, field, value);
            if (seconds > 0) {
                jedis.expire(key.getBytes(), seconds);// 设置过期时间
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }

    public static Long addStringToRedisZSet(final String key, final long score, final String value, final int seconds, final String logStr) {

        Jedis jedis = null;
        Long ret = null;

        try {
            jedis = getRedis();
            ret = jedis.zadd(key, score, value);
            if (seconds > 0) {
                jedis.expire(key, seconds);
            }
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }

        return ret;
    }

    public static Long getZSetCount(final String key, final long min, final long max, final String logStr) {

        Jedis jedis = null;
        Long ret = null;
        try {
            jedis = getRedis();
            ret = jedis.zcount(key, min, max);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }

    public static Set<String> getZSetByScore(final String key, final long min, final long max, final String logStr) {

        Jedis jedis = null;
        Set<String> ret = null;
        try {
            jedis = getRedis();
            ret = jedis.zrangeByScore(key, min, max);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }


    public static Set<String> getRevrZSetByScore(final String key, final long min, final long max, final String logStr) {

        Jedis jedis = null;
        Set<String> ret = null;
        try {
            jedis = getRedis();
            ret = jedis.zrevrange(key, min, max);
        } catch (Exception e) {
            log.error(logStr, e);
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
        return ret;
    }


    /**
     * 方法描述: 通过pipeline批量获取结果
     *
     * @param: [keys, logPrefix]
     * @return: java.util.List<java.lang.String>
     * @auther: fengyajun
     * @date: 2018/8/2 11:19
     */
    public static List<List<String>> pipelineMget(List<String> keys, String logPrefix) {

        Jedis jedis = getRedis();
        Response<List<String>> responseList = null;
        List<Response> responses = new ArrayList<>();

        List<List<String>> result = new ArrayList<>();

        Pipeline pipe = jedis.pipelined();

        try {

            for (String key : keys) {
                responseList = pipe.lrange(key, 0, 1000);
                responses.add(responseList);
            }
            pipe.sync();

            if (responses.isEmpty()) {
                return result;
            }

            int size = responses.size();
            List<String> list = null;

            for (int i = 0; i < size; i++) {
                list = (List<String>) responses.get(i).get();
                if (list == null || list.isEmpty()) {
                    list = new ArrayList<>();
                }
                result.add(list);

            }
        } catch (Exception e) {
            log.error(logPrefix, " desc=jedisErr e=", e.getMessage());
        } finally {
            jedis.close();
            pipe.close();
        }
        return result;
    }

    /**
     * 方法描述: 通过pipeline批量获取结果
     *
     * @param: [keys, logPrefix]
     * @return: java.util.List<java.lang.String>
     * @auther: fengyajun
     * @date: 2018/8/2 11:19
     */
    public static List<Set<String>> pipelineZgetByScore(List<String> keys, final long min, final long max, String logPrefix) {

        Jedis jedis = getRedis();
        Response<Set<String>> responseList = null;
        List<Response> responses = new ArrayList<>();

        List<Set<String>> result = new ArrayList<>();

        Pipeline pipe = jedis.pipelined();

        try {

            for (String key : keys) {
                responseList = pipe.zrangeByScore(key, min, max);
                responses.add(responseList);
            }
            pipe.sync();

            if (responses.isEmpty()) {
                return result;
            }

            Set<String> list = null;

            for (Response response : responses) {
                list = (Set<String>) response.get();
                if (list == null || list.isEmpty()) {
                    list = new HashSet<>();
                }
                result.add(list);
            }
        } catch (Exception e) {
            log.error(logPrefix, " desc=jedisErr e=", e.getMessage());
        } finally {
            jedis.close();
            pipe.close();
        }
        return result;
    }

    public static List<Set<String>> pipelineSmembersSet(List<String> keys, String logPrefix) {

        Jedis jedis = getRedis();
        Response<Set<String>> responseList = null;
        List<Response> responses = new ArrayList<>();

        List<Set<String>> result = new ArrayList<>();

        Pipeline pipe = jedis.pipelined();

        try {

            for (String key : keys) {
                responseList = pipe.smembers(key);
                responses.add(responseList);
            }
            pipe.sync();

            if (responses.isEmpty()) {
                return result;
            }

            Set<String> list = null;

            for (Response response : responses) {
                list = (Set<String>) response.get();
                if (list == null || list.isEmpty()) {
                    list = new HashSet<>();
                }
                result.add(list);
            }
        } catch (Exception e) {
            log.error(logPrefix, " desc=jedisErr e=", e.getMessage());
        } finally {
            jedis.close();
            pipe.close();
        }
        return result;
    }
}
